package freedom.needle.queue;

import java.util.LinkedHashSet;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * FIFO,block,distinct,thread safe
 * 
 * @author caoqf
 *
 * @param <E>
 */
public class BlockingDistinctQueue<E> {
	
	protected final Logger logger = LoggerFactory.getLogger(getClass());
	
	public static final int DEFAULT_CAPACITY = 100;
	public static final int CALL_GC_LIMIT = 10000;
	private static final String info = "destroy %d times. spend %d millis. call gc.";
	
	private int waitingCapacity;
	private int doingCapacity;
	private int destroyTimes = 0;
	private long lastDestroyTime = System.currentTimeMillis();
	private LinkedHashSet<E> waitingSet, doingSet;
	private final Lock lock = new ReentrantLock();
	private Condition waitingSetNotFull = lock.newCondition();
	private Condition waitingSetNotEmpty = lock.newCondition();
	private Condition doingSetNotFull = lock.newCondition();
	
	public BlockingDistinctQueue() {
		this(DEFAULT_CAPACITY);
	}

	public BlockingDistinctQueue(int capacity) {
		super();
		this.waitingCapacity = capacity;
		this.doingCapacity = capacity;
		
		this.waitingSet = new LinkedHashSet<E>(waitingCapacity);
		this.doingSet = new LinkedHashSet<E>(doingCapacity);
	}
	
	public boolean put(E object) throws InterruptedException {
		//not accept null
		if(object != null) {
			lock.lock();
			try {
				//full wait
				while(this.waitingCapacity == this.waitingSet.size()) {
					logger.debug("waitingSet is full! put wait...");
					this.waitingSetNotFull.await();
				}
				
				//not accept repeat object
				if(this.doingSet.contains(object)) {
					logger.debug("already in doingSet! put refused");
					return false;
				}
				boolean added = this.waitingSet.add(object);
				if(added) {
					logger.debug("put success");	
					this.waitingSetNotEmpty.signal();
					return true;
				} 
				//not accept repeat object
				else {
					logger.debug("already in waitingSet! put refused");
					return false;
				}
				
			} finally {
				lock.unlock();
			}
		}
		
		return false;
	}
	
	public E take() throws InterruptedException {
		lock.lock();
		try {
			//empty wait
			while(this.waitingSet.isEmpty()) {
				logger.debug("waitingSet is empty! take wait...");
				this.waitingSetNotEmpty.await();
			}
			
			//full wait
			while(this.doingCapacity == this.doingSet.size()) {
				logger.debug("doingSet is full! take wait...");
				this.doingSetNotFull.await();
			}
			
			//first object
			E object = this.waitingSet.iterator().next();
			boolean removed = this.waitingSet.remove(object);
			boolean added = false;
			if(removed)
				added = this.doingSet.add(object);
			else {
				logger.debug("not in waitingSet!");
			}
			
			if(added) {
				logger.debug("take success");	
				this.waitingSetNotFull.signal();
				return object;
			}
			else {
				logger.debug("already in doingSet!");
			}
		} finally {
			lock.unlock();
		}
		return null;
	}
	
	/**
	 * destroy object took from queue.
	 * must do after using!
	 * @param object
	 * @return
	 */
	public boolean destroy(E object) {
		if(object != null) {
			lock.lock();
			try {
				boolean removed = doingSet.remove(object);
				if(removed) {
					doingSetNotFull.signal();
					this.destroyTimes ++;
					
					if(destroyTimes == CALL_GC_LIMIT) {
						if(logger.isInfoEnabled()) {
							long now = System.currentTimeMillis();
							long spend = now - lastDestroyTime;
							logger.info(String.format(info, destroyTimes, spend));
							destroyTimes = 0;
							lastDestroyTime = now;
						}
					}
					return true;
				}
				else {
					logger.debug("not in doingSet!");
				}
				
			} finally {
				lock.unlock();
			}
		}
			
		return false;
	}
	
	public boolean notFull() throws InterruptedException {
		lock.lock();
		try {
			//full wait
			while(this.waitingCapacity == this.waitingSet.size()) {
				logger.debug("waitingSet is full! put wait...");
				this.waitingSetNotFull.await();
			}
			return true;
		} finally {
			lock.unlock();
		}
	}
	
	public boolean notEmpty() throws InterruptedException {
		lock.lock();
		try {
			//empty wait
			while(this.waitingSet.isEmpty()) {
				logger.debug("waitingSet is empty! take wait...");
				this.waitingSetNotEmpty.await();
			}
			return true;
		} finally {
			lock.unlock();
		}
	}

}
